package com.kaigejava.rocketmq.maindemo.consumer.transaction;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;


/**
 * @author 凯哥Java
 * @description 事务消息的消费者
 * @company
 * @since 2022/10/19 15:41
 */
public class TransactionMessageConsumer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction-group");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("192.168.50.132:9876");
        // 订阅Topic
        consumer.subscribe("transaction-topic", "*");
        //负载均衡模式消费
        //consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数，处理消息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            try {
                String str = new String(msgs.toString());
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), str);
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

}
